package com.atguigu.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TableProcess;
import com.atguigu.common.GmallConfig;
import com.atguigu.utils.DruidDSUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private DruidDataSource druidDataSource;

    public TableProcessFunction() {
    }

    public TableProcessFunction(MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        druidDataSource = DruidDSUtil.createDataSource();
    }

    //value:{"before":null,"after":{"source_table":"base_trademark","sink_table":"dim_base_trademark","sink_columns":"id,tm_name","sink_pk":"id","sink_extend":null},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1655172926148,"snapshot":"false","db":"gmall-211227-config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1655172926150,"transaction":null}
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {

        //1.获取并解析数据为TableProcess对象
        JSONObject jsonObject = JSON.parseObject(value);
        String after = jsonObject.getString("after");
        if (after != null) {
            TableProcess tableProcess = JSON.parseObject(after, TableProcess.class);

            //2.建表
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());

            //3.将数据写入状态
            BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
            broadcastState.put(tableProcess.getSourceTable(),
                    tableProcess);
        }
    }

    //校验并建表
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        if (sinkPk == null || sinkPk.equals("")) {
            sinkPk = "id";
        }

        if (sinkExtend == null) {
            sinkExtend = "";
        }

        //拼接SQL语句 create table if not exists db.tn(id varchar primary key,name varchar,sex varchar) xxx
        StringBuilder createTableSQL = new StringBuilder("create table if not exists ")
                .append(GmallConfig.HBASE_SCHEMA)
                .append(".")
                .append(sinkTable)
                .append("(");
        String[] columns = sinkColumns.split(",");
        for (int i = 0; i < columns.length; i++) {

            String column = columns[i];

            //判断当前字段是否为主键
            if (sinkPk.equals(column)) {
                createTableSQL.append(column).append(" varchar primary key");
            } else {
                createTableSQL.append(column).append(" varchar");
            }

            //判断当前是否为最后一个字段
            if (i < columns.length - 1) {
                createTableSQL.append(",");
            }
        }
        createTableSQL.append(")").append(sinkExtend);

        System.out.println("建表语句为：" + createTableSQL);

        DruidPooledConnection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            //获取连接
            connection = druidDataSource.getConnection();
            //编译SQL
            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            //执行SQL
            preparedStatement.execute();
            //释放资源
            preparedStatement.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException("建表失败：" + sinkTable);
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //value:{"database":"gmall","table":"cart_info","type":"update","ts":1592270938,"xid":13090,"xoffset":1573,"data":{"id":100924,"user_id":"93","sku_id":16,"cart_price":4488.00,"sku_num":1,"img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg","sku_name":"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机","is_checked":null,"create_time":"2020-06-14 09:28:57","operate_time":null,"is_ordered":1,"order_time":"2021-10-17 09:28:58","source_type":"2401","source_id":null},"old":{"is_ordered":0,"order_time":null}}
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {

        //1.获取广播状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("table");
        String type = value.getString("type");//新增insert  修改update  初始化bootstrap-insert
        TableProcess tableProcess = broadcastState.get(key);

        //2.过滤数据
        if (tableProcess != null && ("insert".equals(type) || "update".equals(type) || ("bootstrap-in" +
                "" +
                "" +
                "sert").equals(type))) {

            filter(value.getJSONObject("data"),
                    tableProcess.getSinkColumns());

            //3.补充SinkTable字段并输出
            value.put("sinkTable", tableProcess.getSinkTable());
            out.collect(value);

        } else if (tableProcess == null) {
            System.out.println("没有该维度表" + key + "信息！！！");
        }
    }

    /**
     * 根据建表字段过滤主流数据字段
     *
     * @param data        {"id":"12","tm_name":"atguigu","logo_url":"xxx.xxx"}
     * @param sinkColumns "id,tm_name"
     */
    private void filter(JSONObject data, String sinkColumns) {

        String[] columns = sinkColumns.split(",");
        List<String> columnList = Arrays.asList(columns);

//        Set<Map.Entry<String, Object>> entries = data.entrySet();
//        Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
//        while (iterator.hasNext()) {
//            Map.Entry<String, Object> next = iterator.next();
//            if (!columnList.contains(next.getKey())) {
//                iterator.remove();
//            }
//        }

        Set<Map.Entry<String, Object>> entries = data.entrySet();
        entries.removeIf(next -> !columnList.contains(next.getKey()));

    }

}
